package com.katesoft.scale4j.rttp.jobs;

import com.katesoft.scale4j.log.Logger;
import com.katesoft.scale4j.log.LogFactory;
import org.perf4j.StopWatch;
import org.perf4j.log4j.Log4JStopWatch;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.launch.JobLauncher;

import java.io.Serializable;

import static com.katesoft.scale4j.rttp.client.ISpringContextAccessor.ACCESSOR;

/**
 * This class is bridge between quartz scheduler and spring batch.
 * <p/>
 * Quartz will fire up distributed job execution and this class will copy job properties and will fire spring batch job execution.
 * <p/>
 * 2 properties must be part of JobExecutionContext('rttp.jobLauncher', 'rttp.jobLocator') - if client does not provide,
 * those 2 properties will be injected by
 * framework, and there is 1 optional property 'rttp.jobParametersIncrementer'(reference to JobParametersIncrementer bean).
 * <p/>
 * Please refer to http://static.springsource.org/spring-batch/reference/html-single/index.html#JobParametersIncrementer for more details.
 *
 * @author Dave Syer
 * @author kate2007
 */
public class SpringBatchJobLauncher implements Job, Serializable
{
    private static Logger LOGGER = LogFactory.getLogger(SpringBatchJobLauncher.class);
    //
    public static final String JOB_LAUNCHER = "rttp.jobLauncher";
    public static final String JOB_LOCATOR = "rttp.jobLocator";
    public static final String JOB_PARAMETERS_INCREMENTER = "rttp.jobParametersIncrementer";

    @SuppressWarnings({"unchecked"})
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException
    {
        JobDataMap jobDataMap = context.getMergedJobDataMap();
        String jobName = (String) jobDataMap.get(JobsUtility.JOB_NAME);
        LOGGER.info("Quartz trigger fired for spring batch jobName=%s", jobName);
        JobParameters jobParameters = JobsUtility.covertJobParameters(jobDataMap, true);
        //
        JobLauncher jobLauncher = (JobLauncher) ACCESSOR.getBean(jobDataMap.getString(JOB_LAUNCHER));
        JobRegistry jobLocator = (JobRegistry) ACCESSOR.getBean(jobDataMap.getString(JOB_LOCATOR));
        JobParametersIncrementer jobParametersIncrementer = jobDataMap.containsKey(JOB_PARAMETERS_INCREMENTER) ?
            ACCESSOR.getBean(jobDataMap.getString(JOB_PARAMETERS_INCREMENTER), JobParametersIncrementer.class) : null;
        //
        if (jobParametersIncrementer == null) { jobParametersIncrementer = new QuartzSchedulerParametersIncrementer(context.getFireTime()); }
        jobParameters = jobParametersIncrementer.getNext(jobParameters);
        StopWatch stopWatch = new Log4JStopWatch(String.format("rttp.spring.batch.runner_%s", jobName));
        try {
            LOGGER.info("launching spring batch job %s using[job_launcher=%s, job_locator=%s, jobParameters=%s]", jobName, jobLauncher, jobLocator,
                        jobParameters);
            jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
        }
        catch (org.springframework.batch.core.JobExecutionException e) {
            LOGGER.warn(e);
            throw new JobExecutionException(e);
        }
        finally {
            stopWatch.setMessage(String.format("job %s finished execution", jobName));
            stopWatch.stop();
        }
    }
}
